/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.gcp.credentials.service;

import com.google.auth.http.HttpTransportFactory;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.credentials.factory.AuthenticationStrategy;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.reporting.InitializationException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.AUTHENTICATION_STRATEGY;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.DELEGATION_STRATEGY;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.DELEGATION_USER;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.LEGACY_USE_APPLICATION_DEFAULT_CREDENTIALS;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.LEGACY_USE_COMPUTE_ENGINE_CREDENTIALS;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.WORKLOAD_IDENTITY_AUDIENCE;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SCOPE;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SUBJECT_TOKEN_PROVIDER;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.WORKLOAD_IDENTITY_TOKEN_ENDPOINT;

/**
 * Implementation of GCPCredentialsService interface
 *
 * @see GCPCredentialsService
 */
@CapabilityDescription("Defines credentials for Google Cloud Platform processors. " +
        "Uses Application Default credentials without configuration. " +
        "Application Default credentials support environmental variable (GOOGLE_APPLICATION_CREDENTIALS) pointing to " +
        "a credential file, the config generated by `gcloud auth application-default login`, AppEngine/Compute Engine" +
        " service accounts, etc.")
@Tags({ "gcp", "credentials", "provider" })
@Restricted(
        restrictions = {
                @Restriction(
                        requiredPermission = RequiredPermission.ACCESS_ENVIRONMENT_CREDENTIALS,
                        explanation = "The default configuration can read environment variables and system properties for credentials"
                )
        }
)
public class GCPCredentialsControllerService extends AbstractControllerService implements GCPCredentialsService, VerifiableControllerService {

    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
            AUTHENTICATION_STRATEGY,
            SERVICE_ACCOUNT_JSON_FILE,
            SERVICE_ACCOUNT_JSON,
            WORKLOAD_IDENTITY_AUDIENCE,
            WORKLOAD_IDENTITY_SCOPE,
            WORKLOAD_IDENTITY_TOKEN_ENDPOINT,
            WORKLOAD_IDENTITY_SUBJECT_TOKEN_PROVIDER,
            WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE,
            ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxyAwareTransportFactory.PROXY_SPECS),
            DELEGATION_STRATEGY,
            DELEGATION_USER
    );

    private volatile GoogleCredentials googleCredentials;
    protected final CredentialsFactory credentialsProviderFactory = new CredentialsFactory();

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @Override
    public GoogleCredentials getGoogleCredentials() throws ProcessException {
        return googleCredentials;
    }

    @Override
    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
        final List<ValidationResult> results = new ArrayList<>();
        ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
        return results;
    }

    @Override
    public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
        ConfigVerificationResult result;
        try {
            final GoogleCredentials credentials = getGoogleCredentials(context);
            result = new ConfigVerificationResult.Builder()
                    .verificationStepName("Provide Google Credentials")
                    .outcome(Outcome.SUCCESSFUL)
                    .explanation(String.format("Successfully provided [%s] as Google Credentials", credentials.getClass().getSimpleName()))
                    .build();
        } catch (final IOException e) {
            result = new ConfigVerificationResult.Builder()
                    .verificationStepName("Provide Google Credentials")
                    .outcome(Outcome.FAILED)
                    .explanation(String.format("Failed to provide Google Credentials: " + e.getMessage()))
                    .build();
        }
        return Collections.singletonList(result);
    }

    @OnEnabled
    public void onConfigured(final ConfigurationContext context) throws InitializationException {
        try {
            googleCredentials = getGoogleCredentials(context);
        } catch (final IOException e) {
            throw new InitializationException(e);
        }
    }

    @Override
    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("application-default-credentials", LEGACY_USE_APPLICATION_DEFAULT_CREDENTIALS.getName());
        config.renameProperty("compute-engine-credentials", LEGACY_USE_COMPUTE_ENGINE_CREDENTIALS.getName());
        config.renameProperty("service-account-json-file", SERVICE_ACCOUNT_JSON_FILE.getName());
        config.renameProperty("service-account-json", SERVICE_ACCOUNT_JSON.getName());
        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);

        final boolean legacyFlagsPresent = config.hasProperty(LEGACY_USE_APPLICATION_DEFAULT_CREDENTIALS)
                || config.hasProperty(LEGACY_USE_COMPUTE_ENGINE_CREDENTIALS);
        final Optional<String> authenticationStrategyValue = config.getRawPropertyValue(AUTHENTICATION_STRATEGY)
                .map(String::trim)
                .filter(value -> !value.isEmpty());
        final boolean authenticationStrategyMissing = authenticationStrategyValue.isEmpty();

        if (authenticationStrategyMissing || legacyFlagsPresent) {
            final AuthenticationStrategy authenticationStrategy = determineAuthenticationStrategy(config);
            if (authenticationStrategy != null) {
                config.setProperty(AUTHENTICATION_STRATEGY, authenticationStrategy.getValue());
            }
        }

        config.removeProperty(LEGACY_USE_APPLICATION_DEFAULT_CREDENTIALS.getName());
        config.removeProperty(LEGACY_USE_COMPUTE_ENGINE_CREDENTIALS.getName());
    }

    private AuthenticationStrategy determineAuthenticationStrategy(final PropertyConfiguration config) {
        if (isTrue(config, LEGACY_USE_APPLICATION_DEFAULT_CREDENTIALS)) {
            return AuthenticationStrategy.APPLICATION_DEFAULT;
        } else if (config.isPropertySet(SERVICE_ACCOUNT_JSON_FILE)) {
            return AuthenticationStrategy.SERVICE_ACCOUNT_JSON_FILE;
        } else if (config.isPropertySet(WORKLOAD_IDENTITY_SUBJECT_TOKEN_PROVIDER)) {
            return AuthenticationStrategy.WORKLOAD_IDENTITY_FEDERATION;
        } else if (config.isPropertySet(SERVICE_ACCOUNT_JSON)) {
            return AuthenticationStrategy.SERVICE_ACCOUNT_JSON;
        } else if (isTrue(config, LEGACY_USE_COMPUTE_ENGINE_CREDENTIALS)) {
            return AuthenticationStrategy.COMPUTE_ENGINE;
        }
        return AuthenticationStrategy.APPLICATION_DEFAULT;
    }

    private boolean isTrue(final PropertyConfiguration config, final PropertyDescriptor property) {
        return config.getRawPropertyValue(property)
                .map(value -> "true".equalsIgnoreCase(value.trim()))
                .orElse(false);
    }

    private GoogleCredentials getGoogleCredentials(final ConfigurationContext context) throws IOException {
        final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
        final HttpTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
        return credentialsProviderFactory.getGoogleCredentials(context, transportFactory);
    }

    @Override
    public String toString() {
        return "GCPCredentialsControllerService[id=" + getIdentifier() + "]";
    }
}
